package com.tangtang.basic.rxjava;

import rx.Observable;
import rx.Single;
import rx.Subscription;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;

import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

public class HelloWorld {
    public static void main(String[] args) {
//        Observable.create(emitter -> {
//            while (!emitter.isUnsubscribed()) {
//                long time = System.currentTimeMillis();
//                emitter.onNext(time);
//                if (time % 2 != 0) {
//                    emitter.onError(new IllegalStateException("Odd millisecond!"));
//                    break;
//                }
//            }
//        }).subscribe(System.out::println, Throwable::printStackTrace);


        Observable<String> stringObservable = Observable.from(new Future<String>() {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return false;
            }

            @Override
            public boolean isCancelled() {
                return false;
            }

            @Override
            public boolean isDone() {
                return false;
            }

            @Override
            public String get() throws InterruptedException, ExecutionException {
                Thread.sleep(5000);
                System.out.println(3);
                return "hello world";
            }

            @Override
            public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return null;
            }
        }).asObservable();


        Func0<Observable<String>> observableFunc0 = new Func0<Observable<String>>() {
            @Override
            public Observable<String> call() {
                System.out.println(2);
                return stringObservable;
            }
        };
        final Func1<String, String> wrapWithAllOnNextHooks = new Func1<String, String>() {

            @Override
            public String call(String s) {
                System.out.println(s);
                System.out.println(1);
                return s;
            }
        };

        Observable<String> hystrixObservable =
                Observable.defer(observableFunc0)
                        .map(wrapWithAllOnNextHooks);

        ReplaySubject<String> replaySubject = ReplaySubject.create();

        Subscription subscribe = hystrixObservable.subscribe(replaySubject);



    }
}
